The core of Blurr is the Blurr Transform Spec (BTS).
There are 2 types of BTSs - Streaming and Window BTS.
The Streaming BTS takes raw data and converts it into blocks.
The Window BTS generates data relative to a block. This is especially useful for building features for machine learning models.
Blurr is in Developer Preview so the spec is likely to change
The anatomy of a Streaming BTS
The raw event being processed is defined as source
in the BTS. Event parameters are accessed as source.<key>
such as source.user_id
. BTSs are processed in a python 3.6 environment and the expressions used for field values are executed as python statements.
Header
Type: Blurr:Transform:Streaming
Version: '2018-03-07'
Description: Create blocks from streaming Raw Data
Name: offer_ai_v1
Identity: source.user_id
Time: parse_time(source.event_time, 'YYYY/mm/dd HH:MM:SS')
When: source.package_version = '1.0'
Key | Description | Allowed values | Required |
---|---|---|---|
Type | The type of BTS - Streaming or Window | Blurr:Transform:Streaming or Blurr:Transform:Window |
Required |
Version | Version number of the BTS, used by the Blurr Transform Library to parse the template | Specific BTS versions only 2018-03-01 |
Required |
Description | Text description for the BTS | Any string |
Optional |
Name | Unique name for the BTS | Any string |
Required |
Identity | The dimension in the raw data around which data is aggregated | source.<field> |
Required |
Time | Define what field in the source data to use as the time of the event. This is available as time for other expressions to use. If source does not contain time then the datetime.utcnow() expression can be used to retrieve the current utc datetime |
Any python datetime object |
Optional |
When | Boolean expression that defines which raw data should be processed | Any boolean expression |
Optional |
Store
At the end of a transform, data is persisted in the store. The Blurr Transform Library works with an abstraction of storage which isolates the actual storage tier from how it works with the DTL.
Stores:
- Type: Blurr:Store:Memory
Name: hello_world_store
Key | Description | Allowed values | Required |
---|---|---|---|
Type | The destination data store | Blurr:Store:Memory . More Stores such as S3 and DynamoDB coming soon |
Required |
Name | Name of the store, used for internal referencing within the BTS | Any string |
Required |
Import
The import section can be used to specify arbitrary python code that can be used in the BTS.
As shown below Time
key is using datetime
and timezone
and these
are being specified in the import section.
Import:
- { Module: datetime, Identifiers: [ datetime, timezone ]}
Time: datetime.fromtimestamp(source.timestamp, timezone.utc)
Similarly any custom python code written can be incorporated into the BTS:
Import:
- { Module: my_module, Identifiers: [ my_function1, my_function2 ]}
After this my_function1
and my_function2
can be used in the BTS. This is equivalent
to a from my_module import my_function1, my_fuction2
python statement.
Key | Description | Allowed values | Required |
---|---|---|---|
Module | Python Module to import | Any module that is available to the python execution runtime | Required |
Identifiers | List of identifiers to import | The identifiers provided should be available in the module. If not identifiers are provided then the module is imported directly. | Optional |
Some examples and its equivalent python statement
Import | Python statement |
---|---|
Module: my_module, Identifiers: [ my_function1, my_function2 ] |
from my_module import my_function1, my_fuction2 |
Module: my_module |
import my_module |
Module: my_module as mod_name |
import my_module as mod_name |
Module: my_module, Identifiers: [ my_function1 as func1 ] |
from my_module import my_function1 as func1 |
Aggregates
Aggregates defines groups of data that are either in a one-to-one relationship with the Identity or a in a many-to-one relationship.
There are 3 types of Aggregates in a Streaming BTS.
Identity Aggregate
Blurr:Aggregate:Identity
. Fields in the Identity Aggregates are in a one-to-one relationship with the identity. There is a single record that stores these fields and change to these fields overwrite the previous value. There are no historical records kept for state changes.
Aggregates:
- Type: Blurr:Aggregate:Identity
Name: user
Store: offer_ai_dynamo
When: source.event_id in ['app_launched', 'user_updated']
Fields:
- Name: user_id
Type: string
Value: source.customer_identifier
- Name: country
Value: source.country
Key | Description | Allowed values | Required |
---|---|---|---|
Type | Type of Aggregate | Blurr:Aggregate:Identity , Blurr:Aggregate:Block , Blurr:Aggregate:Variable |
Required |
Name | Name of the Aggregate | Any string that does not start with _ or run_ . , unique within the BTS |
Required |
Store | Name of the Store in which to create the Aggregate | Stores defined in the BTS | Required |
When | Boolean expression that defines which raw events to process | Any boolean expression |
Optional |
An Aggregate
contains fields
for the information being stored. Identity Aggregate
fields are especially useful for data that is relatively static over time - like a user's country.
Each field in an Aggregate
has 4 properties.
Key | Description | Allowed values | Required |
---|---|---|---|
Name | Name of the field | Any string that does not start with _ or run_ . |
Required |
Type | Type of data being stored | integer , boolean , string , datetime , float , map , list , set |
Required |
Value | Value of the field | Any python expression, and must match the Type | Required |
When | Boolean expression that defines which raw events to process | Any boolean expression |
Optional |
All fields in the Aggregate are encapsulated in a Aggregate object. The object is available in the DTL, which is the python environment processing the BTS. Field values can be accessed using AggregateName.FieldName
An Identity Aggregate
can also have Dimensions
to split the aggregation along those dimensions. Dimensions
are also a
list of Fields
with only integer
, boolean
and string
types supported. Each raw event that is evaluated should
contain the data needed to determine all the Dimensions
specified.
Example:
Aggregates:
- Type: Blurr:Aggregate:Identity
Name: user
Store: offer_ai_dynamo
When: source.event_id in ['app_launched', 'user_updated']
Dimensions:
- Name: level
Type: string
Value: source.level
Fields:
- Name: events
Type: int
Value: user.events + 1
- Name: country
Value: source.country
Block Aggregate
Blurr:Aggregate:Block
. Fields in the Block Aggregates are in a one-to-many relationship with the identity. These fields are aggregated together in blocks based on the Dimensions
fields specified.
Blurr:Aggregate:Block
is similar to Blurr:Aggregate:Identity
with the difference being that in the case of Block aggregates Blurr internally
keeps track on the time-range of the block and Blurr:Aggregate:Block
aggregates are expected to be spread across time.
- Type: Blurr:Aggregate:Block
Name: session
When: source.app_version > '3.7'
Dimensions:
- Name: session_id
Type: string
Value: source.session_id
Fields:
- Name: presents_wrapped
Type: integer
Value: session.presents_wrapped + 1
Filter: source.event_name == 'wrap_present'
Key | Description | Allowed values | Required |
---|---|---|---|
Type | Type of Aggregate | Blurr:Aggregate:Identity , Blurr:Aggregate:Block , Blurr:Aggregate:Variable |
Required |
Name | Name of the Aggregate | Any string that does not start with _ or run_ . , unique within the BTS |
Required |
When | Boolean expression that defines which raw events to process | Any boolean expression |
Optional |
Activity Aggregate
Blurr:Aggregate:Activity
.Activity Aggregate is a specialization of the Block Aggregate where the inactivity based aggregate splitting condition is
specified by SeparateByInactiveSeconds
.
Variable
Blurr:Aggregate:Variable
. Variable Aggregates are temporary variables that can be used in other data blocks. Variables aim to reduce code duplication and improve readability. They are useful for cleansing / modifying / typecasting source elements and representing complex filter conditions that evaluate to a binary value.
- Type: Blurr:Aggregate:Variable
Name: vars
Fields:
- Name: item_price_micro
Type: integer
Value: int(float(source.item_price) * 1000000)
This can be accessed as vars.item_price_micro
anywhere in the Streaming BTS
Window BTS
Windowing concepts in Blurr are similar to the concepts in Apache Spark/Flink.
Once the Streaming BTS has run, the raw data is split into blocks. The Window BTS generates data relative to a block.
Say we're training a model to predict what sales offer to show to a user in a session. We want the features of this model to be purchases_prev_week
, games_played_last_session
and win_ratio_last_session
. We'd like to predict next_7_days_revenue
.
When training a model, the data needs to be organized around the decision point. We want to know the value of the features at the point an offer is shown. So if the offer is shown during session 4, the features need to be relative to session 4. And the optimization function (total revenue over next 7 days) needs to be relative to session 4 as well.
The decision point is the Anchor. A window defines segments of data relative to the anchor. In this case, we need the features and optimization function relative to when an offer is shown.
The anatomy of a Window BTS
Header
Type: Blurr:Transform:Window
Version: '2018-03-01'
Description: Generate features around an Anchor
Name: Window Example
SourceBTS: offer_ai_v1
Key | Description | Allowed values | Required |
---|---|---|---|
Type | The type of BTS - Streaming or Window | Blurr:Transform:Streaming or Blurr:Transform:Window |
Required |
Version | Version number of the BTS, used by the Blurr Transform Library to parse the template | Specific BTS versions only 2018-03-01 |
Required |
Description | Text description for the BTS | Any string |
Optional |
Name | Unique name for the Window BTS | Any string |
Required |
SourceBTS | The name of the streaming BTS upon which this window BTS will be executed | Valid streaming BTS | Required |
Anchor
An Anchor represents a decision made at a point in time. For example: when a model to pick the offer price for a user is being used, then this decision being made is an Anchor.
This is a vital concept to align the way data processing is done for training data generation and for prediction. The training data is generated as if the only information that was available was up to the Anchor point. Anything after the Anchor point can only be used as a label (which is what the model will predict).
Anchor:
Condition: offer_ai_v1.game_stats.offer_type != ''
Max: 1
Condition is a python expression which returns a boolean
value to determine if an anchor condition exists in the block being processed. Field values in a block are accessed as StreamingBTSName.AggregateName.FieldName
.
Max defines the maximum number of output rows to be generated every time the Window BTS is run.
When the number of blocks that satisfy the anchor condition is more than the max value specified, then, a ‘max’ number of satisfying sessions are picked randomly. Max can be used to prevent oversampling of more active users.
Store
Stores:
- Type: Blurr:Store:Memory
Name: hello_world_store
Key | Description | Allowed values | Required |
---|---|---|---|
Type | The destination data store | Blurr:Store:Memory . More Stores such as S3 and DynamoDB coming soon |
Required |
Name | Name of the store, used for internal referencing within the BTS | Any string |
Required |
Aggregates
All Aggregate operations that are performed in a window BTS can only use the following available data:
- Anchor block - Block that satisfies the anchor condition. The fields from the anchor block can be accessed as
anchor.FieldName
. - Identity Aggregate - Identity aggregates available from the source Streaming BTS. The fields from an Identity Aggregate can be accessed as
StreamingBTSName.IdentityAggregateName.Field Name
. - A window of blocks around the anchor block - A list of blocks from a Block Aggregate before or after the anchor block based on the window defined. A field from the list of blocks is referenced as
WindowName.FieldName
.
Window Aggregate
Aggregates:
Aggregates:
- Type: Blurr:Aggregate:Window
Name: last_session
# Defines a processing window for the rollup. Supported window types are Day, Hour and Count
WindowType: count
# Negative values are backward from the Anchor
WindowValue: -1
Source: offer_ai_v1.game_stats
Fields:
# The output will contain a column for last_session.games_played
- Name: games_played
Type: integer
# source.games_played is a list containing the games_played information from the
# sessions that fall in the window defined above. In this case, because of a count
# window of 1 the source.games_played list only contains a single data point.
Value: source.games_played[0]
Key | Description | Allowed values | Required |
---|---|---|---|
Type | Type of Aggregate | Blurr:Aggregate:Window , Blurr:Aggregate:Variable |
Required |
Name | Name of the Aggregate | Any string , unique within the BTS |
Required |
WindowType | The type of window to use around the anchor block | day , hour , count |
Optional. A Window Aggregate can be defined without a Window |
WindowValue | The number of days, hours or blocks to window around the anchor block | Integer | Optional. A Window Aggregate can be defined without a Window |
Source | The Block Aggregate or Activity Aggregate (defined in the Streaming BTS) on which the window operations should be performed | Valid Block of Activity Aggregate | Required |
All functions defined on windows work on a list of values. For e.g. if a session contains a games_played
field and a last_week
window is defined on it, then last_week.games_played
represents the list of values from last week's sessions.
Important: Window operations using Window Aggregate
do not include the Anchor block itself.
Each field in a Window Aggregate has 3 properties.
Key | Description | Allowed values | Required |
---|---|---|---|
Name | Name of the field | Any string |
Required |
Type | Type of data being stored | integer , boolean , string , datetime , float , map , list , set |
Required |
Value | Value of the field | Any python expression, and must match the Type | Required |
Variable
The Variable Aggregate works exactly the same as in the Streaming BTS.
Order of Aggregates
Aggregates can be defined in any order. However, if field values are referenced within the BTS, they must be defined in the order in which they are referenced. For example, if a Block Aggregate
uses a Variable
, then the Variable
Aggregate should be defined before the Block Aggregate
so that the Variable
is processed first and available to the Block Aggregate
when the Block Aggregate
is being processed.
Processing field values
All field values must be valid python expressions. These expressions are executed by the Blurr Transform Library (DTL), which uses a Python 3.6 interpreter.
Errors
Field values are not recorded when:
- Evaluation results in an error
- A None value is returned / no values are returned
- Value returned is of a type different from the type of the field and casting the value to the required type fails.
Types
The following types are supported.
Type | Description | Expected format | Default |
---|---|---|---|
integer |
integers! | 2 | 0 |
boolean |
boolean! | true | false |
string |
default type | 'this is a string' | '' |
datetime |
datetime | any datetime object in python | - |
float |
floating point numbers with decimals | 1.2 | 0.0 |
map |
Any type to type maps | ‘{“a”: 1, “b”: 2}’ | empty map |
list |
lists can contain any type of data | ‘[1,2,3]’ | empty list |
set |
contains only unique elements that are integer , long , float and strings |
- | empty set |